Feature/mlflow dataset#6
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 WalkthroughWalkthroughAdds a new ChangesMLflow ParquetDataset
Type Annotations, Suppression Migrations, and Code Cleanup
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces the ParquetDataset class and a from_parquet factory function to support lazy-loaded Parquet datasets within MLflow. The implementation includes metadata-based digest computation, schema inference, and dataset profiling. Feedback was provided to correct a docstring typo, ensure the target column is included in both the dataset digest and the configuration dictionary for accurate tracking, remove a redundant property override, and cache the profile property to optimize performance by avoiding repeated I/O.
There was a problem hiding this comment.
Pull request overview
Adds an MLflow Dataset implementation for Parquet sources (via PyArrow) so Parquet training/eval inputs can be logged to MLflow with inferred schema and lightweight metadata profiling.
Changes:
- Introduce
ParquetDataset(lazy-loaded PyArrow dataset) plusfrom_parquet()factory for MLflow input logging. - Export the new dataset utilities from
rationai.mlkit.mlflow. - Bump locked package version to
0.4.1.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
uv.lock |
Updates the locked project version to 0.4.1. |
rationai/mlkit/mlflow/parquet_dataset.py |
Implements ParquetDataset, digest computation, metadata profiling, and schema inference for Parquet datasets. |
rationai/mlkit/mlflow/__init__.py |
Exposes ParquetDataset and from_parquet as public mlflow utilities. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rationai/mlkit/mlflow/parquet_dataset.py`:
- Line 28: The constructor docstring in the class containing the __init__ method
still uses placeholder text (“Hety.”), so replace it with a concise, meaningful
one-line summary that describes what the constructor initializes. Update the
docstring near the __init__ definition in parquet_dataset.py so it accurately
reflects the purpose of the Parquet dataset constructor and matches the
surrounding class documentation style.
- Line 48: The md5 digest used for content-addressing in parquet_dataset should
be marked non-security so it works on FIPS-enabled interpreters. Update the
hashlib.md5() call in the digest setup to use usedforsecurity=False, keeping the
behavior the same while making the hash explicitly non-cryptographic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 95fca85c-a42c-4095-bb59-1df26c29ad9e
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (11)
rationai/mlkit/data/datasets/slides_tiles_loader.pyrationai/mlkit/data/samplers/stratified_batch_sampler.pyrationai/mlkit/data/shard_parquet.pyrationai/mlkit/lightning/loggers/mlflow.pyrationai/mlkit/metrics/aggregated_metric_collection.pyrationai/mlkit/metrics/aggregators.pyrationai/mlkit/metrics/lazy_metric_dict.pyrationai/mlkit/metrics/nested_metric_collection.pyrationai/mlkit/mlflow/__init__.pyrationai/mlkit/mlflow/parquet_dataset.pyrationai/mlkit/with_cli_args.py
💤 Files with no reviewable changes (1)
- rationai/mlkit/data/shard_parquet.py
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new lazy-loaded ParquetDataset for MLflow tracking, refactors various modules to improve type safety and clean up comments, and bumps the package version to 0.4.1. Key feedback includes addressing a critical bug in shard_parquet.py where shard indices and row counts are not updated when a shard reaches capacity, optimizing schema inference in parquet_dataset.py to avoid unnecessary I/O, and correcting the type hint for **kwargs in stratified_batch_sampler.py.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if current_shard_rows >= rows_per_shard: | ||
| writer.close() | ||
| writer = None |
There was a problem hiding this comment.
There is a critical bug here: when a shard reaches its capacity (rows_per_shard), the writer is closed and set to None, but shard_idx is never incremented and current_shard_rows is never reset to 0.
This causes two major issues:
- The next batch will overwrite the same file (
shard_00000.parquet) becauseshard_idxremains0. current_shard_rowswill keep accumulating and stay aboverows_per_shard, causing a new file to be opened and closed on every single subsequent iteration.
We must increment shard_idx and reset current_shard_rows to 0 when closing the writer.
| if current_shard_rows >= rows_per_shard: | |
| writer.close() | |
| writer = None | |
| if current_shard_rows >= rows_per_shard: | |
| writer.close() | |
| writer = None | |
| shard_idx += 1 | |
| current_shard_rows = 0 |
There was a problem hiding this comment.
that is not true, the code is there already
| from functools import cached_property | ||
| from typing import Any | ||
|
|
||
| import pyarrow.dataset as ds |
| # Fetch an empty Pandas dataframe from the schema to utilize MLflow's built-in | ||
| # inference securely and correctly map PyArrow types to MLflow types. | ||
| empty_df = self._ds.head(0).to_pandas() | ||
| inferred_schema = _infer_schema(empty_df) | ||
| return inferred_schema | ||
| except Exception as e: |
There was a problem hiding this comment.
Instead of calling self._ds.head(0).to_pandas(), which can trigger I/O and requires PyArrow >= 12.0.0, you can construct an empty pyarrow.Table directly from the already-loaded self._ds.schema and convert that to a Pandas DataFrame. This is a zero-I/O, highly compatible, and fast operation.
| # Fetch an empty Pandas dataframe from the schema to utilize MLflow's built-in | |
| # inference securely and correctly map PyArrow types to MLflow types. | |
| empty_df = self._ds.head(0).to_pandas() | |
| inferred_schema = _infer_schema(empty_df) | |
| return inferred_schema | |
| except Exception as e: | |
| try: | |
| # Create an empty PyArrow Table from the schema and convert to Pandas | |
| # to avoid calling head(0) which may trigger I/O or require newer PyArrow versions. | |
| empty_df = pa.Table.from_batches([], schema=self._ds.schema).to_pandas() | |
| inferred_schema = _infer_schema(empty_df) | |
| return inferred_schema |
| def __init__( | ||
| self, | ||
| data: pd.DataFrame, | ||
| stratify_by: None, | ||
| stratify_by: str | list[str], | ||
| batch_size: int, | ||
| **kwargs: dict[str, Any], | ||
| ) -> None: |
There was a problem hiding this comment.
In Python type hinting, **kwargs: T specifies that all keyword arguments passed must be of type T. Specifying **kwargs: dict[str, Any] means each individual keyword argument must be a dictionary, which is likely not the intention here. It should be typed as **kwargs: Any.
| def __init__( | |
| self, | |
| data: pd.DataFrame, | |
| stratify_by: None, | |
| stratify_by: str | list[str], | |
| batch_size: int, | |
| **kwargs: dict[str, Any], | |
| ) -> None: | |
| def __init__( | |
| self, | |
| data: pd.DataFrame, | |
| stratify_by: str | list[str], | |
| batch_size: int, | |
| **kwargs: Any, | |
| ) -> None: |
Summary by CodeRabbit
New Features
ParquetDatasetsupport with lazy Parquet wrapping, dataset digest/profile metadata, and schema inference, plus afrom_parquet(...)helper.Bug Fixes
Improvements